iT邦幫忙

2025 iThome 鐵人賽

DAY 20
0
Rust

DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅系列 第 20

Day 20: 聚合算子 Part 2 - Hash vs Sort Aggregate

  • 分享至 

  • xImage
  •  

前言

在昨天的文章中,我們深入探討了 DataFusion 聚合算子的核心機制並理解聚合操作如何透過狀態管理和 Hash Table 來處理分組數據。然而在特定場景中,純粹的 Hash Aggregation 可能面臨記憶體壓力無法利用已有排序等問題。今天,我們將探討:

  1. Hash Aggregation 的工作原理和適用場景
  2. Sort-based Aggregation(或利用排序的聚合)的原理
  3. 兩種策略的性能權衡
  4. DataFusion 優化器如何選擇聚合策略

Hash Aggregation

核心原理

Hash Aggregation 是昨天我們探討的方法,它的核心思想是使用 Hash Table 來管理分組狀態:

輸入數據流:
  department | salary
  -----------+--------
  IT         | 80000
  Sales      | 60000
  IT         | 90000
  HR         | 70000
  Sales      | 65000

Hash Aggregation 過程:
  1. 為每行計算 hash(department)
     IT    → hash = 12345 → bucket 0
     Sales → hash = 67890 → bucket 1
     HR    → hash = 11111 → bucket 2

  2. 在 Hash Table 中查找或創建分組
     Hash Table:
       bucket 0: IT    → {sum: 0, count: 0}
       bucket 1: Sales → {sum: 0, count: 0}
       bucket 2: HR    → {sum: 0, count: 0}

  3. 逐行更新對應分組的累積器
     處理 "IT, 80000":  IT.sum = 80000, IT.count = 1
     處理 "Sales, 60000": Sales.sum = 60000, Sales.count = 1
     處理 "IT, 90000":  IT.sum = 170000, IT.count = 2
     ...

  4. 輸出所有分組的結果
     IT    → AVG = 170000 / 2 = 85000
     Sales → AVG = 125000 / 2 = 62500
     HR    → AVG = 70000 / 1 = 70000

Hash Aggregation 的優勢

1. 單次掃描(Single-Pass)

Hash Aggregation 最大的優勢是只需一次遍歷輸入數據:

// 偽代碼展示單次掃描的特性
let mut hash_table = HashMap::new();

for batch in input_stream {
    for row in batch {
        let group_key = evaluate_group_by(row);
        let hash_value = hash(group_key);
        
        // 直接在 Hash Table 中查找或創建分組
        let accumulator = hash_table
            .entry(hash_value)
            .or_insert_with(|| create_accumulator());
        
        // 立即更新累積器
        accumulator.update(row.value);
    }
}

// 輸出結果
for (group_key, accumulator) in hash_table {
    output(group_key, accumulator.evaluate());
}

這種單次掃描的特性意味著:

  • 低延遲:不需要等待所有數據到達就可以開始處理
  • 流式友好:適合處理流式數據
  • CPU 高效:每個數據元素只被處理一次

2. 對輸入順序無要求

無論數據以何種順序到達,Hash Aggregation 都能正確處理:

場景 A - 數據集中到達:
  IT, IT, IT, Sales, Sales, HR
  ✓ 正確處理

場景 B - 數據交錯到達:
  IT, Sales, IT, HR, Sales, IT
  ✓ 同樣正確處理

場景 C - 完全隨機:
  Sales, IT, HR, IT, Sales, IT
  ✓ 依然正確處理

這個特性使得 Hash Aggregation 成為默認首選策略

3. 並行友好

Hash Aggregation 非常適合並行處理。在多核 CPU 環境下:

輸入數據(分成 4 個分區):

分區 0: IT, Sales, IT        ┐
分區 1: HR, IT, Sales        ├─→ 各自獨立進行 Hash Aggregation (Partial)
分區 2: Sales, IT, HR        │
分區 3: IT, Sales, Sales     ┘

Partial 結果:
  分區 0: IT → 2, Sales → 1
  分區 1: IT → 1, Sales → 1, HR → 1
  分區 2: IT → 1, Sales → 1, HR → 1
  分區 3: IT → 1, Sales → 2

重分區(按 GROUP BY 鍵 Hash):
  IT → 分區 0: 收集所有 IT 的部分結果
  Sales → 分區 1: 收集所有 Sales 的部分結果
  HR → 分區 2: 收集所有 HR 的部分結果

Final Aggregation:
  並行合併各分組的結果

Hash Aggregation 的挑戰

1. 記憶體壓力

Hash Table 必須將所有分組的狀態保留在記憶體中:

查詢: SELECT user_id, COUNT(*) FROM logs GROUP BY user_id;

假設:
  - 1000 萬個不同的 user_id
  - 每個分組狀態約 100 bytes(包括 key + accumulator)
  
記憶體需求:
  10,000,000 * 100 bytes = 1 GB

如果有 1 億個不同的 user_id:
  100,000,000 * 100 bytes = 10 GB!

當分組數量(group cardinality)非常高時,記憶體可能不足以容納所有分組。

2. Hash Table 性能衰減

隨著 Hash Table 的增長,性能會逐漸下降:

分組數量與查找時間的關係:

10,000 個分組:
  Hash Table 負載因子 < 0.7
  查找時間: ~O(1),快速

1,000,000 個分組:
  Hash Table 需要多次 resize
  Cache miss 增加
  查找時間: 變慢

10,000,000 個分組:
  Hash Table 可能超過 CPU Cache
  每次查找都會導致 memory access
  查找時間: 顯著變慢

3. Spilling 開銷

當記憶體不足時,DataFusion 會將部分數據 spill 到磁碟

// 簡化的 Spilling 邏輯
fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<()> {
    // 檢查記憶體使用量
    if self.memory_usage > self.memory_limit {
        // 需要 spill 部分數據到磁碟
        self.spill_to_disk()?;
    }
    
    // 正常的聚合邏輯
    self.aggregate_batch(batch)?;
    
    Ok(())
}

fn spill_to_disk(&mut self) -> Result<()> {
    // 1. 選擇要 spill 的分組(通常是最大的或最冷的)
    let groups_to_spill = self.select_groups_to_spill();
    
    // 2. 將這些分組的狀態序列化到磁碟
    let spill_file = self.spill_manager.create_spill_file()?;
    for group in groups_to_spill {
        spill_file.write(group.key, group.accumulator.state())?;
    }
    
    // 3. 從記憶體中移除這些分組
    self.hash_table.remove_groups(groups_to_spill);
    
    Ok(())
}

Spilling 帶來的開銷:

  • 磁碟 I/O:寫入和讀取磁碟都很慢(比記憶體慢 100-1000 倍)
  • 額外的合併階段:需要在最後合併記憶體中的結果和磁碟上的結果
  • 序列化/反序列化:狀態的序列化和反序列化消耗 CPU

DataFusion 中的 Hash Aggregation 實現

讓我們看看 DataFusion 如何處理 Hash Aggregation:

// datafusion/physical-plan/src/aggregates/row_hash.rs
impl GroupedHashAggregateStream {
    fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<()> {
        // 1. 評估 GROUP BY 表達式
        let group_by_values = evaluate_group_by(&self.group_by, &batch)?;
        
        // 2. 評估聚合函數的輸入
        let input_values = evaluate_many(&self.aggregate_arguments, &batch)?;
        
        // 3. 使用 GroupValues 確定每行的分組索引
        let starting_num_groups = self.group_values.len();
        self.group_values.intern(
            &group_by_values,
            &mut self.current_group_indices,
        )?;
        
        // 4. 如果分組數量增長,擴展累積器
        if self.group_values.len() > starting_num_groups {
            self.accumulators
                .iter_mut()
                .try_for_each(|accumulator| {
                    accumulator.resize(self.group_values.len())
                })?;
        }
        
        // 5. 更新每個累積器
        for (accumulator, values) in self.accumulators.iter_mut()
            .zip(input_values.iter())
        {
            accumulator.update_batch(
                values,
                &self.current_group_indices,
                opt_filter,
                self.group_values.len(),
            )?;
        }
        
        // 6. 檢查是否需要 emit 或 spill
        self.emit_if_necessary()?;
        
        Ok(())
    }
}

這個實現體現了 Hash Aggregation 的核心特點:

  • 使用 GroupValues trait 的高效實現來管理分組
  • 使用 GroupsAccumulator 進行批次化更新
  • 動態調整累積器大小以適應新分組
  • 集成記憶體管理和 spilling 機制

Sort-based Aggregation

核心原理

Sort-based Aggregation(基於排序的聚合)是另一種聚合策略,它依賴於輸入數據按 GROUP BY 鍵排序這一前提:

前提: 輸入數據已按 department 排序

輸入數據流:
  department | salary
  -----------+--------
  HR         | 70000    ┐
  HR         | 72000    ├─ HR 組
  HR         | 68000    ┘
  IT         | 80000    ┐
  IT         | 90000    ├─ IT 組
  IT         | 85000    │
  IT         | 95000    ┘
  Sales      | 60000    ┐
  Sales      | 65000    ├─ Sales 組
  Sales      | 63000    ┘

Sort-based Aggregation 過程:
  1. 初始化當前分組: current_group = null
  2. 讀取第一行: department = HR
     - 當前分組為空,創建新分組 HR
     - 累積: HR.sum = 70000, HR.count = 1
  
  3. 讀取第二行: department = HR (相同)
     - 繼續累積到當前分組
     - 累積: HR.sum = 142000, HR.count = 2
  
  4. 讀取第三行: department = HR (相同)
     - 繼續累積
     - 累積: HR.sum = 210000, HR.count = 3
  
  5. 讀取第四行: department = IT (不同!)
     - 當前分組結束,輸出結果: HR → AVG = 210000/3 = 70000
     - 創建新分組 IT
     - 累積: IT.sum = 80000, IT.count = 1
  
  6. 持續處理...

Sort-based Aggregation 的優勢

1. 極低的記憶體佔用

與 Hash Aggregation 最大的不同:Sort-based Aggregation 只需要保留當前分組的狀態

Hash Aggregation 記憶體需求:
  記憶體 = 分組數量 × 每個分組狀態大小
  1000 萬個分組 → 1 GB+ 記憶體

Sort-based Aggregation 記憶體需求:
  記憶體 = 1 × 單個分組狀態大小
  無論多少分組 → 只需幾 KB 記憶體!

實際案例:
  查詢: SELECT user_id, COUNT(*) FROM logs GROUP BY user_id;
  
  Hash:
    - 1 億個用戶 → 需要 10 GB 記憶體
    - 可能需要 spill 到磁碟
  
  Sort-based (假設已排序):
    - 1 億個用戶 → 只需 ~1 KB 記憶體
    - 不需要 spill

這個特性使得 Sort-based Aggregation 在高基數分組記憶體受限環境中非常有吸引力。

2. 無需 Hash Table 維護

不需要 Hash Table 意味著:

  • 無 Hash 計算開銷:省去了對每行計算 hash 的 CPU 時間
  • 無 Hash 衝突處理:避免了衝突解決的額外開銷
  • 無 Hash Table resize:不需要在容量不足時進行昂貴的 rehashing

3. Cache 友好

由於只處理當前分組,數據局部性極好:

記憶體訪問模式:

Hash Aggregation:
  處理 IT 員工 #1   → 訪問 hash_table[IT]
  處理 Sales 員工 #1 → 訪問 hash_table[Sales]  (Cache miss)
  處理 HR 員工 #1   → 訪問 hash_table[HR]     (Cache miss)
  處理 IT 員工 #2   → 訪問 hash_table[IT]     (Cache miss)
  ...
  隨機訪問模式,Cache 命中率低

Sort-based Aggregation:
  處理 HR 員工 #1   → 訪問 current_group
  處理 HR 員工 #2   → 訪問 current_group        (Cache hit)
  處理 HR 員工 #3   → 訪問 current_group        (Cache hit)
  ...
  處理 IT 員工 #1   → 訪問 current_group
  處理 IT 員工 #2   → 訪問 current_group        (Cache hit)
  ...
  順序訪問模式,Cache 命中率極高

Sort-based Aggregation 的挑戰

1. 必須預先排序

這是最大的限制:輸入數據必須按 GROUP BY 鍵排序。

如果數據未排序,需要先進行排序:

場景 A: 數據已經排序(例如從索引掃描獲得)
  無需額外操作 → Sort-based Aggregation 免費獲得!

場景 B: 數據未排序
  需要先排序:
    SortExec → AggregateExec
    ↑
    排序開銷可能很大!

排序成本:
  - 時間複雜度: O(N log N)
  - 空間複雜度: 可能需要額外的 O(N) 空間
  - 如果數據無法全部放入記憶體,需要外部排序(更慢)

2. 需要完整數據(或分區內完整)

Sort-based Aggregation 要求同一分組的所有數據連續出現:

正確的輸入(可以處理):
  HR, HR, HR, IT, IT, Sales, Sales

錯誤的輸入(無法處理):
  HR, IT, HR, Sales, IT, HR, Sales
  ↑           ↑
  HR 分組分散在多處,無法正確聚合

這意味著在分散式環境中,必須先進行 shuffle(重分區),確保同一分組的數據在同一個分區。

3. 不適合流式處理

因為需要排序,Sort-based Aggregation 不適合真正的流式場景:

流式數據:
  時間 0s:  IT, Sales, HR
  時間 1s:  IT, HR, Sales
  時間 2s:  Sales, IT, HR
  ...

問題:
  - 無法保證數據按分組鍵有序到達
  - 如果等待排序,會引入延遲
  - 與流式處理的低延遲目標衝突

DataFusion 如何利用排序

需要注意的是,DataFusion 並沒有實現獨立的 Sort-based Aggregation 算子。相反,它採用了一種混合策略

// DataFusion 的做法:
// 1. 主要使用 Hash Aggregation(GroupedHashAggregateStream)
// 2. 但如果輸入已經部分排序,會利用這個特性來優化

// 來自 update_aggr_exprs 優化規則
fn optimize_aggregate_with_ordering(
    aggr_exec: &AggregateExec,
) -> Result<AggregateExec> {
    let groupby_exprs = aggr_exec.group_expr().input_exprs();
    let input = aggr_exec.input();
    
    // 檢查輸入是否按 GROUP BY 鍵的前綴排序
    let indices = get_ordered_partition_by_indices(&groupby_exprs, input)?;
    
    if !indices.is_empty() {
        // 輸入有序!可以利用這個特性
        // 調整聚合表達式以利用順序
        let requirement = indices
            .iter()
            .map(|&idx| PhysicalSortRequirement::new(
                Arc::clone(&groupby_exprs[idx]),
                None,
            ))
            .collect();
        
        // 某些聚合函數(如 FIRST_VALUE)可以在有序輸入下更高效
        let optimized_aggr_exprs = try_convert_aggregate_if_better(
            aggr_exec.aggr_expr(),
            &requirement,
            input.equivalence_properties(),
        )?;
        
        return Ok(aggr_exec.with_new_aggr_exprs(optimized_aggr_exprs));
    }
    
    Ok(aggr_exec.clone())
}

這種方法的優勢:

  • 靈活性:Hash Aggregation 處理無序數據,同時能利用有序數據的優勢
  • 降低記憶體壓力:當輸入按 GROUP BY 鍵排序時,相同分組的數據會連續到達,可以提前 emit(發出)完成的分組,釋放記憶體
  • 優化特定聚合函數:某些聚合函數(如 FIRST_VALUELAST_VALUE)在有序輸入下可以更高效地實現

兩種策略的性能權衡

讓我們通過具體案例來比較兩種策略:

案例 1:低基數分組(例如:按部門聚合)

SELECT department, AVG(salary), COUNT(*)
FROM employees  -- 100 萬行
GROUP BY department;  -- 只有 10 個部門

Hash Aggregation:

  • 優勢:單次掃描,快速完成
  • 記憶體:只需維護 10 個分組狀態,記憶體開銷極小(幾 KB)
  • ⚠️ Hash 開銷:每行需要計算 hash,但開銷可忽略
  • 總體最優選擇

Sort-based Aggregation:

  • 排序開銷:需要先對 100 萬行排序,O(N log N) = 約 20M 次比較
  • 聚合快:排序後聚合很快
  • 記憶體極低:只需維護 1 個當前分組
  • 總體:排序開銷使得總體更慢
性能對比:
  Hash Aggregation:  100 ms(單次掃描)
  Sort + Aggregate:  500 ms(400 ms 排序 + 100 ms 聚合)
  
  結論: Hash 快 5 倍

案例 2:高基數分組 + 數據已排序(例如:按用戶 ID 聚合,從索引掃描)

SELECT user_id, SUM(purchase_amount)
FROM purchases  -- 10 億行,已按 user_id 排序(索引掃描)
GROUP BY user_id;  -- 1 億個不同用戶

Hash Aggregation:

  • 單次掃描
  • 記憶體壓力巨大:需要維護 1 億個分組 → 約 10-20 GB 記憶體
  • 必然 Spill:會觸發多次 spill 到磁碟
  • Hash Table 性能衰減:隨著分組增多,查找變慢
  • 總體:記憶體不足,大量 spill,性能差

Sort-based Aggregation:

  • 無需排序:數據已經有序!
  • 記憶體極低:只需幾 KB
  • 無 Spill:完全在記憶體中流式處理
  • Cache 友好:順序訪問,高 Cache 命中率
  • 總體最優選擇
性能對比:
  Hash Aggregation:  300 秒(大量 spill I/O)
  Sort-based:        30 秒(流式處理,無 spill)
  
  結論: Sort-based 快 10 倍

DataFusion 優化器的選擇策略

DataFusion 採用以 Hash 為主,智能利用排序的策略。讓我們看看優化器如何工作:

1. 默認選擇:Hash Aggregation

物理規劃器 (PhysicalPlanner) 默認將 LogicalPlan::Aggregate 轉換為 AggregateExec,使用 Hash-based 實現:

// 簡化的物理規劃邏輯
fn create_physical_plan_for_aggregate(
    logical_agg: &Aggregate,
) -> Result<Arc<dyn ExecutionPlan>> {
    // 默認策略:使用 Hash Aggregation
    let aggregate_mode = if needs_two_phase {
        AggregateMode::Partial  // 第一階段
    } else {
        AggregateMode::Single   // 單階段
    };
    
    let aggr_exec = AggregateExec::try_new(
        aggregate_mode,
        group_by_exprs,
        aggr_exprs,
        input_plan,
        input_schema,
    )?;
    
    Ok(Arc::new(aggr_exec))
}

2. 利用已有排序:UpdateAggregateExprWithOrdering 規則

這個優化規則檢查輸入是否按 GROUP BY 鍵排序,並據此優化:

// datafusion/physical-optimizer/src/update_aggr_exprs.rs
impl PhysicalOptimizerRule for UpdateAggregateExprWithOrdering {
    fn optimize(
        &self,
        plan: Arc<dyn ExecutionPlan>,
        _config: &ConfigOptions,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        plan.transform_up(|plan| {
            if let Some(aggr_exec) = plan.as_any().downcast_ref::<AggregateExec>() {
                // 只優化第一階段(Partial)
                if !aggr_exec.mode().is_first_stage() {
                    return Ok(Transformed::no(plan));
                }
                
                let input = aggr_exec.input();
                let groupby_exprs = aggr_exec.group_expr().input_exprs();
                
                // 檢查輸入是否按 GROUP BY 的前綴排序
                let ordered_indices = get_ordered_partition_by_indices(
                    &groupby_exprs,
                    input,
                )?;
                
                if ordered_indices.is_empty() {
                    // 沒有排序,無法優化
                    return Ok(Transformed::no(plan));
                }
                
                // 有排序!可以優化
                let sort_requirement: Vec<_> = ordered_indices
                    .iter()
                    .map(|&idx| PhysicalSortRequirement::new(
                        Arc::clone(&groupby_exprs[idx]),
                        None,
                    ))
                    .collect();
                
                // 嘗試將聚合函數轉換為更高效的版本
                let new_aggr_exprs = try_convert_aggregate_if_better(
                    aggr_exec.aggr_expr().to_vec(),
                    &sort_requirement,
                    input.equivalence_properties(),
                )?;
                
                let new_aggr_exec = aggr_exec.with_new_aggr_exprs(new_aggr_exprs);
                Ok(Transformed::yes(Arc::new(new_aggr_exec) as _))
            } else {
                Ok(Transformed::no(plan))
            }
        })
        .data()
    }
}

這個規則的關鍵作用

  1. 檢測排序:識別輸入是否按 GROUP BY 鍵(或其前綴)排序
  2. 優化聚合函數:某些聚合函數在有序輸入下可以使用更高效的實現
  3. 提前發射:雖然仍使用 Hash Table,但可以在分組完成時立即發射結果,減少記憶體佔用

3. 實際觀察優化器的選擇

讓我們通過 EXPLAIN 來觀察優化器的決策:

-- 場景 1: 無序數據
EXPLAIN SELECT department, AVG(salary) 
FROM employees 
GROUP BY department;
Physical Plan:
  AggregateExec: mode=Single, gby=[department@0], aggr=[AVG(salary)]
    ParquetExec: file=employees.parquet, partitions={4}
    
分析:
  - 使用 Hash Aggregation(GroupedHashAggregateStream)
  - 單階段聚合(因為數據量不大)
-- 場景 2: 數據已按 department 排序
EXPLAIN SELECT department, AVG(salary)
FROM employees_sorted  -- 已按 department 排序
GROUP BY department;
Physical Plan:
  AggregateExec: mode=Single, gby=[department@0], aggr=[AVG(salary)]
    -- 注意: 保留了輸入的排序
    ParquetExec: file=employees_sorted.parquet, 
                 output_ordering=[department ASC]
    
分析:
  - 仍使用 AggregateExec(基於 Hash)
  - 但優化器知道輸入有序,可以利用這個特性
  - 聚合函數可能被優化為更高效的版本

4. 手動控制:強制使用排序

在某些情況下,你可以手動在聚合前插入排序:

-- 方法 1: 顯式 ORDER BY(如果後面有需要排序的操作)
SELECT user_id, SUM(amount) as total
FROM transactions
GROUP BY user_id
ORDER BY user_id;  -- 這會在聚合後排序

-- 方法 2: 使用子查詢先排序(強制策略,通常不推薦)
SELECT user_id, SUM(amount) as total
FROM (
    SELECT * FROM transactions ORDER BY user_id
)
GROUP BY user_id;

但通常不推薦手動插入排序,因為:

  • 優化器通常能做出更好的決策
  • 不必要的排序會浪費資源
  • DataFusion 的 Hash Aggregation 已經很高效

小結

今天我們深入探討了聚合策略的選擇:Hash AggregationSort-based Aggregation

Hash Aggregation 的特點

  1. 優勢:單次掃描、並行友好、無需預先排序
  2. 挑戰:高基數分組時記憶體壓力大、可能需要 spill
  3. 適用:默認首選,特別是低到中等基數分組

Sort-based Aggregation 的特點

  1. 優勢:記憶體佔用極低、Cache 友好、適合高基數
  2. 挑戰:需要預先排序(開銷可能很大)
  3. 適用:數據已排序或記憶體嚴重受限時

DataFusion 的策略

  1. 以 Hash 為主:默認使用 Hash Aggregation(GroupedHashAggregateStream)
  2. 智能優化:通過 UpdateAggregateExprWithOrdering 規則利用已有排序
  3. 兩階段聚合:結合分區和重分區實現分散式聚合
  4. 記憶體管理:集成 spilling 機制處理記憶體壓力

聚合策略的選擇沒有絕對的"最佳"策略,只有"最適合"的策略。理解兩種策略的權衡,能幫助我們更好地設計查詢和優化系統。

明天,我們將探討 Join 算子 Part 1 - Hash Join 原理,了解另一個重要的有狀態算子如何高效執行。

參考資料

  1. DataFusion AggregateExec 原始碼
  2. GroupedHashAggregateStream 實現
  3. UpdateAggregateExprWithOrdering 優化規則
  4. GroupsAccumulator Trait
  5. Hash vs Sort Aggregation in Database Systems
  6. Memory Management in DataFusion
  7. DataFusion Spilling Strategy
  8. Efficiently Compiling Efficient Query Plans for Modern Hardware
  9. Apache Spark's Aggregation Optimizations

上一篇
Day 19: 聚合算子 Part 1 - Aggregate 執行流程
下一篇
Day 21: Join 算子 Part 1 - Hash Join 原理
系列文
DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅23
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言